Skip to content

Conversation

@TaiJuWu
Copy link
Collaborator

@TaiJuWu TaiJuWu commented Oct 22, 2025

Remove the redundant copy and change the return type to List. This
change also align ShareConsumer

Reviewers: Kirk True [email protected], Ken Huang
[email protected], Chia-Ping Tsai [email protected]

@github-actions github-actions bot added triage PRs from the community consumer clients small Small PRs labels Oct 22, 2025
Copy link
Collaborator

@m1a2st m1a2st left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, for this patch, left one comment

Comment on lines 338 to 344
* Return the set of <em>fetchable</em> partitions, which are the set of partitions to which we are subscribed,
* but <em>excluding</em> any partitions for which we still have buffered data. The idea is that since the user
* has yet to process the data for the partition that has already been fetched, we should not go send for more data
* until the previously-fetched data has been processed.
*
* @param buffered The set of partitions we have in our buffer
* @return {@link Set} of {@link TopicPartition topic partitions} for which we should fetch data
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also update the Javadoc accordingly

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed it.

// Return all partitions that are in an otherwise fetchable state *and* for which we don't already have some
// messages sitting in our buffer.
return new HashSet<>(subscriptions.fetchablePartitions(isNotBuffered));
return subscriptions.fetchablePartitions(isNotBuffered);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kirktrue do you have free cycle to take a look at this change? thanks!

@github-actions github-actions bot removed the triage PRs from the community label Oct 23, 2025
Copy link
Contributor

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR.

Is there any reason not to make this change at SubscriptionState.fetchablePartitions()? That is...

@@ -482,9 +482,9 @@ public class SubscriptionState {
     }
 
     // Visible for testing
-    public synchronized List<TopicPartition> fetchablePartitions(Predicate<TopicPartition> isAvailable) {
+    public synchronized Set<TopicPartition> fetchablePartitions(Predicate<TopicPartition> isAvailable) {
         // Since this is in the hot-path for fetching, we do this instead of using java.util.stream API
-        List<TopicPartition> result = new ArrayList<>();
+        Set<TopicPartition> result = new HashSet<>();
         assignment.forEach((topicPartition, topicPartitionState) -> {
             // Cheap check is first to avoid evaluating the predicate if possible
             if ((subscriptionType.equals(SubscriptionType.AUTO_TOPICS_SHARE) || isFetchableAndSubscribed(topicPartition, topicPartitionState))

As @chia7712 mentioned here, it's already a distinct set, so we could reduce ambiguity by addressing the problem at "the source."

That would still require the change to remove the duplicate set in AbstractFetch as well as a minor change to ShareConsumeRequestManager and SubscriptionStateTest.

Thoughts?

@TaiJuWu
Copy link
Collaborator Author

TaiJuWu commented Oct 28, 2025

Thanks for the PR.

Is there any reason not to make this change at SubscriptionState.fetchablePartitions()? That is...

@@ -482,9 +482,9 @@ public class SubscriptionState {
     }
 
     // Visible for testing
-    public synchronized List<TopicPartition> fetchablePartitions(Predicate<TopicPartition> isAvailable) {
+    public synchronized Set<TopicPartition> fetchablePartitions(Predicate<TopicPartition> isAvailable) {
         // Since this is in the hot-path for fetching, we do this instead of using java.util.stream API
-        List<TopicPartition> result = new ArrayList<>();
+        Set<TopicPartition> result = new HashSet<>();
         assignment.forEach((topicPartition, topicPartitionState) -> {
             // Cheap check is first to avoid evaluating the predicate if possible
             if ((subscriptionType.equals(SubscriptionType.AUTO_TOPICS_SHARE) || isFetchableAndSubscribed(topicPartition, topicPartitionState))

As @chia7712 mentioned here, it's already a distinct set, so we could reduce ambiguity by addressing the problem at "the source."

That would still require the change to remove the duplicate set in AbstractFetch as well as a minor change to ShareConsumeRequestManager and SubscriptionStateTest.

Thoughts?

command: ./jmh-benchmarks/jmh.sh SubscriptionStateBenchmark
If we do that change it will impact performance, in current version

Benchmark                                                   (partitionCount)  (topicCount)  Mode  Cnt  Score   Error  Units
SubscriptionStateBenchmark.testFetchablePartitions                        50          5000  avgt   15  8.994 ± 0.149  ms/op
SubscriptionStateBenchmark.testHasAllFetchPositions                       50          5000  avgt   15  5.593 ± 0.475  ms/op
SubscriptionStateBenchmark.testPartitionsNeedingValidation                50          5000  avgt   15  5.580 ± 0.074  ms/op
JMH benchmarks done

commit 1ba4072

Benchmark                                                   (partitionCount)  (topicCount)  Mode  Cnt   Score   Error  Units
SubscriptionStateBenchmark.testFetchablePartitions                        50          5000  avgt   15  36.920 ± 1.396  ms/op
SubscriptionStateBenchmark.testHasAllFetchPositions                       50          5000  avgt   15   4.441 ± 0.110  ms/op
SubscriptionStateBenchmark.testPartitionsNeedingValidation                50          5000  avgt   15   5.349 ± 0.052  ms/op
JMH benchmarks done

We can see the hot path (SubscriptionStateBenchmark.testFetchablePartitions) performance is becoming poor.

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@chia7712 chia7712 merged commit 0072bd9 into apache:trunk Oct 29, 2025
23 checks passed
@TaiJuWu TaiJuWu deleted the KAFKA-19820 branch October 29, 2025 13:46
chia7712 pushed a commit that referenced this pull request Oct 30, 2025
…ns (#20790)

This PR is a minor improvement on the [previous
PR](#20745), correcting the javadoc
and comment information.

Reviewers: TaiJuWu <[email protected]>, Chia-Ping Tsai
 <[email protected]>
eduwercamacaro pushed a commit to littlehorse-enterprises/kafka that referenced this pull request Nov 12, 2025
…artitions (apache#20745)

Remove the redundant copy and change the return type to List.  This
change also align `ShareConsumer`

Reviewers: Kirk True <[email protected]>, Ken Huang
 <[email protected]>, Chia-Ping Tsai <[email protected]>
eduwercamacaro pushed a commit to littlehorse-enterprises/kafka that referenced this pull request Nov 12, 2025
…ns (apache#20790)

This PR is a minor improvement on the [previous
PR](apache#20745), correcting the javadoc
and comment information.

Reviewers: TaiJuWu <[email protected]>, Chia-Ping Tsai
 <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants